package com.stronglink.esm27.datasync.handle;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.jboss.logging.Logger;

import com.alibaba.fastjson.JSON;
import com.stronglink.esm27.datasync.entity.CloneTableInfo;
import com.stronglink.esm27.datasync.entity.ResultMessage;
import com.stronglink.esm27.datasync.exception.SyncExistedException;
import com.stronglink.esm27.datasync.mq.ActiveMqClient;
import com.stronglink.esm27.datasync.service.IDataSyncService;
import com.stronglink.esm27.datasync.service.ISQLListener;

/**
 * 将源数据库中的表数据拷贝到目标数据库的表中
 * 
 * @author yuzhantao
 *
 */
public class TableCloneHandle implements Runnable {
	protected final static Logger logger = Logger.getLogger(TableCloneHandle.class);
	protected final static Set<String> nowHandleDatabaseSet = new HashSet<>(); // 正在处理的数据库列表
	protected final static int ERROR_CODE = -1; // 开始同步的code
	protected final static int START_SYNC_CODE = 1000; // 开始同步任务的code
	protected final static int START_SYNC_TABLE_CODE = 1001; // 开始同步表的code
	protected final static int OVER_SYNC_CODE = 1002; // 结束同步表的code
	protected final static int DATA_SYNC_CODE = 1003; // 数据同步的code
	protected final static String SEND_MQ_TOPIC = "esm27.data.clone"; // 发送的mq主题名
	String srcDataSurceParams = null; // 获取源数据源参数
	String destDataSurecParams = null; // 获取目标数据源参数
	IDataSyncService dataSyncService;
	ActiveMqClient mqClient;
	String[] tables;

	public TableCloneHandle(IDataSyncService dataSyncService, ActiveMqClient mq, String srcDataSurceParams,
			String destDataSurecParams, String[] tables) throws SyncExistedException {
		// 如果有线程正在处理该数据库，则抛出异常。
		if (nowHandleDatabaseSet.contains(destDataSurecParams)) {
			throw new SyncExistedException();
		}
		this.dataSyncService = dataSyncService;
		this.mqClient = mq;
		this.srcDataSurceParams = srcDataSurceParams;
		this.destDataSurecParams = destDataSurecParams;
		this.tables = tables;
	}

	@Override
	public void run() {
		nowHandleDatabaseSet.add(destDataSurecParams);
		try {
			TableCloneHandle.this.sendMq(TableCloneHandle.this.getTopic(), START_SYNC_CODE, tables);
			for (String table : this.tables) {
				final List<Map<String, Object>> tableDatas;
				try {
					tableDatas = dataSyncService.getTableAllContent(srcDataSurceParams, table); // 从源数据库表中获取数据
					dataSyncService.clearTableContent(destDataSurecParams, table); // 清除目标数据库中的数据
					dataSyncService.insert(destDataSurecParams, table, tableDatas, new ISQLListener() {

						@Override
						public void onConnectioned() {
							logger.info("数据库已连接,准备克隆[" + table + "]表，预计拷贝[" + tableDatas.size() + "]条数据.");
							CloneTableInfo cti = new CloneTableInfo();
							cti.setCount(tableDatas.size()); // 记录需要同步的数据总行数
							cti.setTableName(table); // 表明
							TableCloneHandle.this.sendMq(TableCloneHandle.this.getTopic(), START_SYNC_TABLE_CODE, cti);
						}

						@Override
						public void onExecuteQuery(String tableName, String sql, ResultSet resultSet) {
						}

						@Override
						public void onExecuteUpdate(String tableName, String sql, int resultCount) {
							logger.info("数据库操作 tableName=" + tableName + "   返回数量=" + resultCount + "		SQL=" + sql);
							CloneTableInfo cti = new CloneTableInfo();
							cti.setCount(resultCount); // 记录更新的数据行数
							cti.setTableName(tableName); // 表明
							TableCloneHandle.this.sendMq(TableCloneHandle.this.getTopic(), DATA_SYNC_CODE, cti);
						}

						@Override
						public void onClose() {
							logger.info("数据库连接已关闭");
						}

						@Override
						public void onError(Throwable throwable) {
							logger.error("数据库操作异常:" + throwable.getMessage());
							TableCloneHandle.this.sendMq(TableCloneHandle.this.getTopic(), ERROR_CODE,
									throwable.getMessage());
						}
					});
				} catch (SQLException e) {
					TableCloneHandle.this.sendMq(TableCloneHandle.this.getTopic(), ERROR_CODE, e.getMessage());
				} 
				
				// 默认同步时如果有消防车表，将它的状态改为非在用（既2）
				if("t_fire_engine".equals(table)) {
					dataSyncService.updateSql(destDataSurecParams, "update t_fire_engine set status=2"); 
				}
			}
			
			this.sendMq(getTopic(), OVER_SYNC_CODE, null);
		}catch(Exception e) {
			TableCloneHandle.this.sendMq(TableCloneHandle.this.getTopic(), ERROR_CODE, e.getMessage());
		} finally {
			nowHandleDatabaseSet.remove(destDataSurecParams);
		}

		
	}

	/**
	 * 获取发送的mq主题
	 * 
	 * @return
	 */
	private String getTopic() {
		return SEND_MQ_TOPIC;
	}

	/**
	 * 发送mq消息
	 * 
	 * @param topic 主题名
	 * @param code  业务代码
	 * @param obj   业务数据
	 */
	private void sendMq(String topic, int code, Object obj) {
		ResultMessage rm = new ResultMessage();
		rm.setCode(code);
		rm.setMessage(obj);
		this.mqClient.send(topic, JSON.toJSONString(rm));
	}
}
